home *** CD-ROM | disk | FTP | other *** search
- # Source Generated with Decompyle++
- # File: in.pyo (Python 2.5)
-
- __revision__ = '$Id: cache.py 647 2006-08-26 18:27:39Z jajcus $'
- __docformat__ = 'restructuredtext en'
- import threading
- from datetime import datetime, timedelta
- _state_values = {
- 'new': 0,
- 'fresh': 1,
- 'old': 2,
- 'stale': 3,
- 'purged': 4 }
-
- class CacheItem(object):
- __slots__ = [
- 'value',
- 'address',
- 'state',
- 'timestamp',
- 'freshness_time',
- 'expire_time',
- 'purge_time',
- 'state_value',
- '_lock']
-
- def __init__(self, address, value, freshness_period, expiration_period, purge_period, state = 'new'):
- if freshness_period > expiration_period:
- raise ValueError, 'freshness_period greater then expiration_period'
-
- if expiration_period > purge_period:
- raise ValueError, 'expiration_period greater then purge_period'
-
- self.address = address
- self.value = value
- now = datetime.utcnow()
- self.timestamp = now
- self.freshness_time = now + freshness_period
- self.expire_time = now + expiration_period
- if purge_period:
- self.purge_time = now + purge_period
- else:
- self.purge_time = datetime.max
- self.state = state
- self.state_value = _state_values[state]
- self._lock = threading.RLock()
-
-
- def update_state(self):
- self._lock.acquire()
-
- try:
- now = datetime.utcnow()
- if self.state == 'new':
- self.state = 'fresh'
-
- if self.state == 'fresh':
- if now > self.freshness_time:
- self.state = 'old'
-
-
- if self.state == 'old':
- if now > self.expire_time:
- self.state = 'stale'
-
-
- if self.state == 'stale':
- if now > self.purge_time:
- self.state = 'purged'
-
-
- self.state_value = _state_values[self.state]
- return self.state
- finally:
- self._lock.release()
-
-
-
- def __cmp__(self, other):
-
- try:
- return cmp((-(self.state_value), self.timestamp, id(self)), (-(other.state_value), other.timestamp, id(other)))
- except AttributeError:
- return cmp(id(self), id(other))
-
-
-
- _hour = timedelta(hours = 1)
-
- class CacheFetcher:
-
- def __init__(self, cache, address, item_freshness_period, item_expiration_period, item_purge_period, object_handler, error_handler, timeout_handler, timeout_period, backup_state = None):
- self.cache = cache
- self.address = address
- self._item_freshness_period = item_freshness_period
- self._item_expiration_period = item_expiration_period
- self._item_purge_period = item_purge_period
- self._object_handler = object_handler
- self._error_handler = error_handler
- self._timeout_handler = timeout_handler
- if timeout_period:
- self.timeout_time = datetime.utcnow() + timeout_period
- else:
- self.timeout_time = datetime.max
- self._backup_state = backup_state
- self.active = True
-
-
- def _deactivate(self):
- self.cache.remove_fetcher(self)
- if self.active:
- self._deactivated()
-
-
-
- def _deactivated(self):
- self.active = False
-
-
- def fetch(self):
- raise RuntimeError, 'Pure virtual method called'
-
-
- def got_it(self, value, state = 'new'):
- if not self.active:
- return None
-
- item = CacheItem(self.address, value, self._item_freshness_period, self._item_expiration_period, self._item_purge_period, state)
- self._object_handler(item.address, item.value, item.state)
- self.cache.add_item(item)
- self._deactivate()
-
-
- def error(self, error_data):
- if not self.active:
- return None
-
- if not self._try_backup_item():
- self._error_handler(self.address, error_data)
-
- self.cache.invalidate_object(self.address)
- self._deactivate()
-
-
- def timeout(self):
- if not self.active:
- return None
-
- if not self._try_backup_item():
- if self._timeout_handler:
- self._timeout_handler(self.address)
- else:
- self._error_handler(self.address, None)
-
- self.cache.invalidate_object(self.address)
- self._deactivate()
-
-
- def _try_backup_item(self):
- if not self._backup_state:
- return False
-
- item = self.cache.get_item(self.address, self._backup_state)
- if item:
- self._object_handler(item.address, item.value, item.state)
- return True
- else:
- False
-
-
-
- class Cache:
-
- def __init__(self, max_items, default_freshness_period = _hour, default_expiration_period = 12 * _hour, default_purge_period = 24 * _hour):
- self.default_freshness_period = default_freshness_period
- self.default_expiration_period = default_expiration_period
- self.default_purge_period = default_purge_period
- self.max_items = max_items
- self._items = { }
- self._items_list = []
- self._fetcher = None
- self._active_fetchers = []
- self._purged = 0
- self._lock = threading.RLock()
-
-
- def request_object(self, address, state, object_handler, error_handler = None, timeout_handler = None, backup_state = None, timeout = timedelta(minutes = 60), freshness_period = None, expiration_period = None, purge_period = None):
- self._lock.acquire()
-
- try:
- if state == 'stale':
- state = 'purged'
-
- item = self.get_item(address, state)
- if item:
- object_handler(item.address, item.value, item.state)
- return None
-
- if not self._fetcher:
- raise TypeError, 'No cache fetcher defined'
-
- if not error_handler:
-
- def default_error_handler(address, _unused):
- return object_handler(address, None, 'error')
-
- error_handler = default_error_handler
-
- if not timeout_handler:
-
- def default_timeout_handler(address):
- return error_handler(address, None)
-
- timeout_handler = default_timeout_handler
-
- if freshness_period is None:
- freshness_period = self.default_freshness_period
-
- if expiration_period is None:
- expiration_period = self.default_expiration_period
-
- if purge_period is None:
- purge_period = self.default_purge_period
-
- fetcher = self._fetcher(self, address, freshness_period, expiration_period, purge_period, object_handler, error_handler, timeout_handler, timeout, backup_state)
- fetcher.fetch()
- self._active_fetchers.append((fetcher.timeout_time, fetcher))
- self._active_fetchers.sort()
- finally:
- self._lock.release()
-
-
-
- def invalidate_object(self, address, state = 'stale'):
- self._lock.acquire()
-
- try:
- item = self.get_item(address)
- if item and item.state_value < _state_values[state]:
- item.state = state
- item.update_state()
- self._items_list.sort()
- finally:
- self._lock.release()
-
-
-
- def add_item(self, item):
- self._lock.acquire()
-
- try:
- state = item.update_state()
- if state != 'purged':
- if len(self._items_list) >= self.max_items:
- self.purge_items()
-
- self._items[item.address] = item
- self._items_list.append(item)
- self._items_list.sort()
-
- return item.state
- finally:
- self._lock.release()
-
-
-
- def get_item(self, address, state = 'fresh'):
- self._lock.acquire()
-
- try:
- item = self._items.get(address)
- if not item:
- return None
-
- self.update_item(item)
- if _state_values[state] >= item.state_value:
- return item
-
- return None
- finally:
- self._lock.release()
-
-
-
- def update_item(self, item):
- self._lock.acquire()
-
- try:
- state = item.update_state()
- self._items_list.sort()
- if item.state == 'purged':
- self._purged += 1
- if self._purged > 0.25 * self.max_items:
- self.purge_items()
-
-
- return state
- finally:
- self._lock.release()
-
-
-
- def num_items(self):
- return len(self._items_list)
-
-
- def purge_items(self):
- self._lock.acquire()
-
- try:
- il = self._items_list
- num_items = len(il)
- need_remove = num_items - int(0.75 * self.max_items)
- for _unused in range(need_remove):
- item = il.pop(0)
-
- try:
- del self._items[item.address]
- continue
- except KeyError:
- continue
-
-
-
- while il and il[0].update_state() == 'purged':
- item = il.pop(0)
-
- try:
- del self._items[item.address]
- continue
- except KeyError:
- None<EXCEPTION MATCH>KeyError
- None<EXCEPTION MATCH>KeyError
- continue
-
-
- None<EXCEPTION MATCH>KeyError<EXCEPTION MATCH>KeyError
- finally:
- self._lock.release()
-
-
-
- def tick(self):
- self._lock.acquire()
-
- try:
- now = datetime.utcnow()
- for t, f in list(self._active_fetchers):
- if t > now:
- break
-
- f.timeout()
-
- self.purge_items()
- finally:
- self._lock.release()
-
-
-
- def remove_fetcher(self, fetcher):
- self._lock.acquire()
-
- try:
- for t, f in list(self._active_fetchers):
- if f is fetcher:
- self._active_fetchers.remove((t, f))
- f._deactivated()
- return None
- continue
- finally:
- self._lock.release()
-
-
-
- def set_fetcher(self, fetcher_class):
- self._lock.acquire()
-
- try:
- self._fetcher = fetcher_class
- finally:
- self._lock.release()
-
-
-
-
- class CacheSuite:
-
- def __init__(self, max_items, default_freshness_period = _hour, default_expiration_period = 12 * _hour, default_purge_period = 24 * _hour):
- self.default_freshness_period = default_freshness_period
- self.default_expiration_period = default_expiration_period
- self.default_purge_period = default_purge_period
- self.max_items = max_items
- self._caches = { }
- self._lock = threading.RLock()
-
-
- def request_object(self, object_class, address, state, object_handler, error_handler = None, timeout_handler = None, backup_state = None, timeout = None, freshness_period = None, expiration_period = None, purge_period = None):
- self._lock.acquire()
-
- try:
- if object_class not in self._caches:
- raise TypeError, 'No cache for %r' % (object_class,)
-
- self._caches[object_class].request_object(address, state, object_handler, error_handler, timeout_handler, backup_state, timeout, freshness_period, expiration_period, purge_period)
- finally:
- self._lock.release()
-
-
-
- def tick(self):
- self._lock.acquire()
-
- try:
- for cache in self._caches.values():
- cache.tick()
- finally:
- self._lock.release()
-
-
-
- def register_fetcher(self, object_class, fetcher_class):
- self._lock.acquire()
-
- try:
- cache = self._caches.get(object_class)
- if not cache:
- cache = Cache(self.max_items, self.default_freshness_period, self.default_expiration_period, self.default_purge_period)
- self._caches[object_class] = cache
-
- cache.set_fetcher(fetcher_class)
- finally:
- self._lock.release()
-
-
-
- def unregister_fetcher(self, object_class):
- self._lock.acquire()
-
- try:
- cache = self._caches.get(object_class)
- if not cache:
- return None
-
- cache.set_fetcher(None)
- finally:
- self._lock.release()
-
-
-
-